Platform Explorer / Nuxeo Platform 2023.8

Component org.nuxeo.ecm.core.bulk.config

Requirements

Resolution Order

824
The resolution order represents the order in which this component has been resolved by the Nuxeo Runtime framework.
You can influence this order by adding "require" tags in your component declaration, to make sure it is resolved after another component.

Contributions

XML Source

<?xml version="1.0"?>
<component name="org.nuxeo.ecm.core.bulk.config" version="1.0.0">

  <require>org.nuxeo.runtime.stream.service</require>

  <!-- ======================================================================================= -->
  <!-- Ensure backward/forward compatibility by registering schema evolution -->
  <extension point="schema" target="org.nuxeo.runtime.avro">
    <schema name="Record-2023.0" file="avro/Record-0x5D82F1387F01C58B.avsc" />
    <schema name="BulkCommand-2023.0" file="avro/BulkCommand-0xF032C4AB0AF8B906.avsc" />
    <schema name="BulkCommand-2023.4" file="avro/BulkCommand-0xDF8D62C9EE6F08F9.avsc" />
    <schema name="BulkStatus-2023.0" file="avro/BulkStatus-0xDA29E947B01E64AA.avsc" />
    <schema name="DataBucket-2023.0" file="avro/DataBucket-0xEB04211B6C6C3B5F.avsc" />
    <schema name="BulkBucket-2023.0" file="avro/BulkBucket-0xCC59A5FF2725F7AF.avsc" />
    <schema name="NuxeoLogEvent-2023.7" file="avro/NuxeoLogEvent-0xCA271CCAAF986742.avsc" />
    <!-- Register 2021 schemas for smooth upgrade -->
    <schema name="Record-2021.0" file="avro/Record-0xEE727BE73E8D498.avsc" />
    <schema name="BulkCommand-2021.6" file="avro/BulkCommand-0x5F64BA8BF77C9B7D.avsc" />
    <schema name="BulkCommand-2021.46" file="avro/BulkCommand-0x66217E9613482336.avsc" />
    <schema name="BulkStatus-2021.7" file="avro/BulkStatus-0xA58F151AC3B3DEBF.avsc" />
    <schema name="DataBucket-2021.0" file="avro/DataBucket-0xB62494C74E419198.avsc" />
    <!-- Same schemas in 2023 -->
    <!-- schema name="BulkBucket-2021.0" file="avro/BulkBucket-0xCC59A5FF2725F7AF.avsc" / -->
  </extension>

  <!-- ======================================================================================= -->
  <!-- Bulk Service configuration -->

  <!-- Initialize bulk service streams before action processors -->
  <extension point="logConfig" target="org.nuxeo.runtime.stream.service">
    <logConfig>
      <!-- command stream size sets the maximum concurrency for the scroller computation in the Nuxeo cluster -->
      <log name="bulk/command" size="2" />
      <!-- status stream size sets the maximum concurrency for the status computation in the Nuxeo cluster -->
      <log name="bulk/status" size="1" />
      <!-- done stream size sets the maximum concurrency for the triggers computations -->
      <log name="bulk/done" size="1" />
    </logConfig>
  </extension>

  <!-- Configure the default scroller behavior -->
  <extension target="org.nuxeo.runtime.ConfigurationService" point="configuration">
    <property name="nuxeo.core.bulk.scroller.scroll.size">200</property>
    <property name="nuxeo.core.bulk.scroller.scroll.keepAliveSeconds">${nuxeo.core.bulk.scroller.scroll.keepAliveSeconds:=300}</property>
    <property name="nuxeo.core.bulk.scroller.transactionTimeout">2d</property>
    <property name="nuxeo.core.bulk.scroller.produceImmediate">false</property>
  </extension>

  <!-- Bulk service processor -->
  <extension target="org.nuxeo.runtime.stream.service" point="streamProcessor">
    <streamProcessor name="bulkServiceProcessor" class="org.nuxeo.ecm.core.bulk.BulkServiceProcessor"
      defaultCodec="avro" defaultConcurrency="1" defaultPartitions="1" defaultExternal="true" start="false">
      <stream name="bulk/command" external="false" />
      <stream name="bulk/status" external="false" />
      <stream name="bulk/done" external="false" />
      <policy name="bulk/scroller" maxRetries="0" delay="1s" maxDelay="60s"
        continueOnFailure="${nuxeo.core.bulk.scroller.continueOnFailure:=false}" />
      <policy name="bulk/status" maxRetries="20" delay="1s" maxDelay="60s"
        continueOnFailure="${nuxeo.core.bulk.status.continueOnFailure:=false}" />
      <computation name="bulk/scroller" concurrency="${nuxeo.core.bulk.scroller.concurrency:=2}" />
      <computation name="bulk/status" concurrency="${nuxeo.core.bulk.status.concurrency:=1}" />
    </streamProcessor>

    <streamProcessor name="streamIntrospection" enabled="${metrics.streams.enabled:=false}"
      class="org.nuxeo.ecm.core.bulk.introspection.StreamIntrospectionProcessor" defaultCodec="avro"
      defaultConcurrency="1" defaultPartitions="1" />

  </extension>

  <!-- ======================================================================================= -->
  <!-- Actions configuration -->

  <extension target="org.nuxeo.ecm.core.bulk" point="actions">
    <!-- Internal -->
    <action name="trash" inputStream="bulk/trash" bucketSize="100" batchSize="25" sequentialScroll="true" />
    <action name="removeProxy" inputStream="bulk/removeProxy" bucketSize="100" batchSize="25" />
    <action name="setSystemProperties" inputStream="bulk/setSystemProperties" bucketSize="100" batchSize="25" />
    <!-- Exposed through REST API -->
    <action name="setProperties" inputStream="bulk/setProperties" bucketSize="100" batchSize="25" httpEnabled="true"
      validationClass="org.nuxeo.ecm.core.bulk.validation.SetPropertiesValidation" />
  </extension>

  <!-- Action's processor -->
  <extension target="org.nuxeo.runtime.stream.service" point="streamProcessor">
    <!-- SetProperty processor -->
    <streamProcessor name="setProperties" class="org.nuxeo.ecm.core.bulk.action.SetPropertiesAction"
      defaultConcurrency="${nuxeo.bulk.action.setProperties.defaultConcurrency:=2}"
      defaultPartitions="${nuxeo.bulk.action.setProperties.defaultPartitions:=4}">
      <policy name="default" maxRetries="3" delay="500ms" maxDelay="10s" continueOnFailure="false" />
    </streamProcessor>

    <!-- SetSystemProperty processor -->
    <streamProcessor name="setSystemProperties" class="org.nuxeo.ecm.core.bulk.action.SetSystemPropertiesAction"
      defaultConcurrency="${nuxeo.bulk.action.setSystemProperties.defaultConcurrency:=2}"
      defaultPartitions="${nuxeo.bulk.action.setSystemProperties.defaultPartitions:=4}">
      <policy name="default" maxRetries="3" delay="500ms" maxDelay="10s" continueOnFailure="false" />
    </streamProcessor>

    <!-- RemoveProxy processor -->
    <streamProcessor name="removeProxy" class="org.nuxeo.ecm.core.bulk.action.RemoveProxyAction"
      defaultConcurrency="${nuxeo.bulk.action.removeProxy.defaultConcurrency:=2}"
      defaultPartitions="${nuxeo.bulk.action.removeProxy.defaultPartitions:=4}">
      <policy name="default" maxRetries="3" delay="500ms" maxDelay="10s" continueOnFailure="false" />
    </streamProcessor>

    <!-- Trash processor -->
    <streamProcessor name="trash" class="org.nuxeo.ecm.core.bulk.action.TrashAction"
      defaultConcurrency="${nuxeo.bulk.action.trash.defaultConcurrency:=1}"
      defaultPartitions="${nuxeo.bulk.action.trash.defaultPartitions:=1}">
      <policy name="default" maxRetries="3" delay="500ms" maxDelay="10s" continueOnFailure="false" />
    </streamProcessor>

  </extension>

</component>